flink状态管理

您所在的位置:网站首页 flink 算子状态 flink状态管理

flink状态管理

2024-01-24 02:41| 来源: 网络整理| 查看: 265

在flink中,状态叫做State,用来保存中间结果或者一些缓存数据,对于Flink中的很多DataStream算子来说,他们都需要依赖一定中间结果即状态来进行计算。例如去重操作,CEP检测操作,Exactly Once等等。因此状态是Flink处理系统中比较重要的一环。

1. 状态数据结构种类

状态继承关系比较复杂,层级较多,但是主要常用的有以下几种数据结构的状态:

ValueState 类型为T的单值状态,可以通过value()方法来获取,并且通过update方法来更新它

ListState 类型为T的元素列表

ReducingState 使用ReducingState.add(value:T)方法立刻返回一个使用ReduceFunction聚合后的值,可以通过ReducingState.get()获取该值

MapState 保存了一组key到value的映射。这个状态提供了很多类似Java Map的方法

AggregatingState 和ReducingState行为类似,AggregateFunction相对更加通用

Internal****State 这种State主要是用于系统内部访问状态数据的方法,一般是不会给用户进行使用的

BroadcastState 用于存储BroadcastStream中的状态数据,BroadcastState中的数据会被发送到指定算子的所有事例中,并且每个实例中的数据都相同

StateUml.png

2. KeyedState 和 OperatorState

根据DataStream数据集是否基于Key进行分组,可以分为KeyedState和OperatorState两种类型。

KeyedState: KeyedStrem流上面的每一个Key都对应一个State对象。 OperatorState: OperatorState和并行的算子实例绑定,整个算子只对应一个State。 StateKeyedStateOperatorState适用范围keyedStream算子用于所有类型的算子状态个数KeyedStrem流上面的每一个Key都对应一个State对象整个算子对应一个算子状态访问方式重写RichFunction实现CheckpointedFunction或ListCheckpointed接口状态数据结构支持ValueState,ListState,ReducingState,AggregatingState,MapStateListState,BraoadcastState 3. StateBackend-存储和维护状态的地方

StateBackend主要有以下作用

stateBackend能够创建state,提供接口读取数据 当系统进行checkpoint时候,stateBackend将状态进行持久化存储。 3.1 KeyStatedBackend的实现

由下图可知,KeyStatedBackend实现了KeyedStateBackend,KeyedStateBackend继承了PriorityQueueSetFactory,KeyStateFactory。因此该类拥有PriorityQueueQueueElement优先级队列,创建InternalKvState的方法。同时该类还拥有snapshot方法,能对KeyedStateBackend 中的状态数据进行快照。 同时,HeapKeyedStateBackend和RocksDBKeyedStateBackend是KeyedStateBackend的两种基本实现类 RocksDBKeyedStateBackend是一个单独的jar包,必须单独引入,如果没有该jar包,是看不到RocksDBKeyedStateBackend的继承关系的

KeyedStateBackendUML.png

3.1.2 HeapKeyedStateBackend的介绍

HeapStateBackend是基于JVM堆内存存储的,是Flink默认支持的KeyedStateBackend。

HeapKeyedStateBackend的状态类型 由于HeapKeyedStateBackend是一个状态后端,里面存储了状态类型,而HeapKeyedStateBackend的状态类型主要是以Heap开头,例如HeapAggregatingState,HeapListState以及HeapReducingState。同时该状态类型需要继承AbstractHeapState类才能完成

StateAbstractHeapState.png

StateTable的设计与实现 在HeapKeyedStateBackend中主要是通过StateTable来存储数据结构来存储状态数据。在AbstractHeapState中定义了StateTable,在实例化AbstractHeapState时会定义这个StateTable接口的实现类。 在HeapKeyedStateBackend中,主要是通过 Map registeredKVStates来进行存储,key为我们注册定力的状态的名称,value为用来存储状态的StateTable的数据。 private final Map registeredKVStates;

StateTable.png

由上图所知,stateTable主要有CopyOnWriteStateTable和NestedMapsStateTable两种实现方式,默认情况下,一般都是使用CopyOnWriteStateTable,CopyOnWriteStateTable数据结构主要是能够在checkpoint过程中,支持异步快照操作,底层借助了CopyonWriteMap数据结构存储数据.NestedMapsStateTable底层借助了NestedStateMap数据结构存储数据元素,checkpoint过程支持同步快照

CopyOnWriteStateTable为了能够异步快照以及增量rehashing从而牺牲了一定的性能以及内存存储。

CopyOnWriteStateTable 和普通的hashTable不同,主要是采用了 K Key,N namespace两个hashkey的形式来确定一个值,没有使用嵌套的结构。主要进行存储的数据结构叫做 StateMapEntry[] primaryTable,一个StateMapEntry的list

3.2 OperatorStateBackend的实现 OperatorState可以通过OperatorStateBackend以及RawKeyedStateInput两种类型状态管理后端创建。OperatorStateBackend是Flink默认的托管算子状态的管理后端,托管状态是指由Flink框架管理的State,如ValueState,ListState,MapState.无用由用户感知管理。而原始状态,用户自定义的状态,需要开发者自己管理,使用byte数组来读写状态内容。

如下图所示,OperatorStateBackend实现了4个接口,OperatorStateStore接口提供了获取BroadcastState、ListState以及注册在OperatorStateStore中的StateNames的方法。OperatorStatebackend只有DefaultOperatorStateBackend一个默认实现类。同时,所有算子的状态数据只能存储在JVM的内存中。

OperatorStateBackendUML.png

StateBackend的整体设计

StateBackend是一个接口,定义了流式应用的状态的存储的保存方式以及checkpoint的方式。不同的StateBackend存储的方式不同。 如下图所示,StateBackend的主要实现类是MemoryStateBackend,FsStateBackend和RocksDBackend. StateBackend接口中包含了createKeyedStateBackend(),ceateOperarotStateBackend()等方法,来获取上文中的KeyedStateBackend以及OperatorStateBackend。三种状态后端的主要区别在于采购员北方华北的JetedStateBackend和checkpointStorage不一样

StateBackendUML.png

nameMemoryStateBackendFsStateBackendRocksDBackendKeyedStatebackendHeapKeyedStateBackendHeapKeyedStateBackendRocksDBKeyedStateBackendcheckpoint数据JobManager堆内存Checkpoint数据存储在指定文件嵌入式的本地数据库RocksDB使用场景本地开发测试调试可用与产线环境,适用于高可用方案相对于FSstateBackend,基于RocksDB的LSM—Tree内存数据结构,更大批量的状态数据存储。目前唯一支持的增量检查点的后端注意点受限于JobManger的内存大小.每个State默认5MB,可通过构造函数调整.每个State不能超过AkkaFrame大小state数据会被存在TaskManager里面,不能超过TaskManager内存。TaskManager异步把数据局写入外部存储总State大小受限于磁盘大小,不受内存限制。RocksDB需要配置外部系统文件保存State。RocksDB的JNI API基于byte,单key和单value不能超过2的31次方 4.参考文献

Flink设计与实现核心原理与源码解析



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3